Snowflake Bulk Ingest with Storage Integration
In big data processing, sometimes it is required to load huge amounts of data in batches. In this scenario, bulk ingestion is extremely useful. Snowflake Bulk Ingest when used in the data integration stage, helps you load batches of data from files available in a data lake like Amazon S3. Bulk Ingest loads chunks of data every time you run the pipeline. Data is first pushed into a landing layer, and then sent to the unification layer.
Calibo's Data Pipeline Studio (DPS) supports bulk ingestion of data using Snowflake bulk ingest in the data integration stage, S3 as data lake in the data source stage, and Snowflake as the target data lake. Following is an example of a Snowflake bulk ingest data pipeline:
How Snowflake Bulk Ingest works
In case of Snowflake bulk ingest, every time you run the data pipeline, data from S3 is ingested into the data lake. During data ingestion, the data is first pushed into a landing layer. Depending on the use case, you can perform operations like append, overwrite or merge on the data. The processed data is then pushed into the unification layer. During this process credentials are required to access the Amazon S3 bucket. Read, write permissions are required to Snowflake objects. You can avoid sharing credentials directly and instead use Storage Integration.
What is Storage Integration?
Storage Integration is a Snowflake object that helps you to connect to the AWS account from Snowflake using the IAM service. You can specify allowed and blocked storage locations. This way you can provide enhanced security to the complete data ingestion operation.
See Configuring a Snowflake storage integration to access Amazon S3.
Prerequisites for using Snowflake Bulk Ingest in the data integration layer
Ensure that you meet the following prerequisites:
-
You must have Amazon S3 and Snowflake data lake configured in the Lazsa Platform.
-
You must have a storage integration created in Snowflake.
To create a data integration job for Snowflake bulk ingest
-
On the home page of DPS, add the following stages. Your pipeline looks like this:
-
Data Lake: Amazon S3
-
Data Integration: Snowflake Bulk Ingest
-
Data Lake: Snowflake
-
-
Configure the Amazon S3 and Snowflake nodes.
-
Click on the data integration node and click Create Job.
-
For the data integration job creation, provide the following inputs:
-
Job Name: Provide a name for the data integration job.
-
Node Rerun Attempts: This is the number of times the pipeline run is attempted on this node in case of failure. By default the setting done at the pipeline level is considered. To change the default setting, you can select an option from the dropdown.
-
Datastore: This is populated based on the datastore that you configure for the data lake (source) node.
-
Choose Source Format: Select one of the following options:
-
CSV
-
JSON
-
Parquet
-
-
Add Base Path:
-
Click Add Base Path.
-
In the Choose Base Path screen, select a path and then click Select.
-
Click Next.
-
On the Landing Layer Details screen, provide the following inputs:
-
Database – this is populated based on the selected database.
-
Landing Layer Schema – this is populated based on the selected schema.
-
Create/Choose Landing Layer Table – Either select a table from the dropdown list or create a new table in the landing layer where the data is stored temporarily.
-
File format – this is populated based on the file format that you selected for the source stage.
-
Stage Name for S3 – this is created based on the landing layer table that you create or choose. A suffix Stage is added to it to form the stage name for S3.
-
Stream on Landing Table – this is created based on the landing layer table that you create or choose. A suffix Stream is added to it to form the Stream name for the landing table.
-
Click Next.
The Datastore and Warehouse are populated based on the options selected.
Provide the following information for Unification Layer Details:
-
Database - this is populated based on the selected database.
-
Target Schema - this is populated based on the selected schema.
-
Create/Choose Unification Layer Table - Either select a table from the dropdown list or create a new table in the unification layer where the data is stored temporarily.
-
Operation Type: select the type of operation to be performed on the source data during the job run. Choose one of the following options:
-
Append - adds new data at the end of the table without erasing the existing content.
-
Overwrite – replaces the existing data in the table.
-
Merge – combines the existing and new data in the table.
-
This screen lets you map the columns and datatypes from the selected table in source with the columns and datatypes in the target table.
To add any additional custom columns to the target table as per your requirement, under Add Custom Columns, provide the following information:
-
Column Name – provide a name for the custom column.
-
Type – select from static parameter or system parameter. If you choose system parameter, then select a parameter value from the dropdown.
-
Value – provide a value or select one from the dropdown depending on the type you selected.
Click Add.
Under Added Custom Columns, you can perform the following actions:
-
Click the pencil icon to edit the details of the custom column. Make the required changes and click Update.
-
Delete the custom column that you have added by clicking the Delete icon.
Select a storage integration from the dropdown list.
A storage integration is an object created in Snowflake that stores a generated identity and access management (IAM) user for your S3 cloud storage, along with an optional set of allowed or blocked storage locations (i.e. buckets).
Ensure that the storage integration that you select has access to the selected S3 bucket in the source stage.
For information on how to create a storage integration, refer to the following link:
Configuring a Snowflake storage integration to access Amazon S3
`What's next? Snowflake Stream Ingest with Storage Integration |